package com.clp.protocol.iec104.server;

import com.clp.protocol.iec104.server.async.SlaveFutureListener;
import com.clp.protocol.iec104.server.async.SlaveFuture;
import com.clp.protocol.iec104.server.async.SlavePromise;
import com.clp.protocol.core.async.NSuccessPolicy;
import com.clp.protocol.core.server.NettyServer;
import com.clp.protocol.core.common.Container;
import io.netty.channel.*;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;

@Slf4j
public class InSlave implements Slave {
    private final NettyServer nettyServer;

    private final String localHost;
    private final int localPort;
    private final int rtuAddr;

    private final ChannelInitializer<ServerSocketChannel> initializer;
    private final ChannelInitializer<SocketChannel> channelInitializer;
    private final SlaveControlConfig controlConfig;
    private final SlaveDataConfig dataConfig;

    private volatile ServerChannel serverChannel;
    private final Container<InSlaveChannel> inSlaveChannels;

    private volatile SlaveFuture<Void> openFuture;
    private volatile SlaveFuture<Void> closeFuture;

    // 遥控、遥调全局唯一性保证
    private final TcTokenPool tcTokenPool = new TcTokenPool(TcToken::new);
    private final TaTokenPool taTokenPool = new TaTokenPool(TaToken::new);

    public InSlave(NettyServer nettyServer, SlaveConfig cfg) {
        this.nettyServer = nettyServer;
        this.inSlaveChannels = new Container<>();

        this.localHost = cfg.getLocalHost();
        this.localPort = cfg.getLocalPort();
        this.rtuAddr = cfg.getRtuAddr();

        this.initializer = new ChannelInitializer<ServerSocketChannel>() {
            @Override
            protected void initChannel(ServerSocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        if (msg instanceof SocketChannel) {
                            SocketChannel socketChannel = (SocketChannel) msg;
                            // 读到新的socket连接
                            log.info("[{}] 新连接到达: {}", InSlave.this, socketChannel.remoteAddress());
                        }
                        ctx.fireChannelRead(msg);
                    }
                });
            }
        };
        this.channelInitializer = new ChannelInitializer<SocketChannel> () {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                InSlaveChannel inSlaveChannel = register(ch); // 注册
                if (inSlaveChannel == null) return;
                ch.closeFuture().addListener((ChannelFutureListener) future -> unregister(inSlaveChannel)); // 取消注册
            }
        };
        this.controlConfig = cfg.getControlConfig();
        this.dataConfig = cfg.getDataConfig();
    }

    /**
     * 注册新的子站通道
     * @param channel
     * @return
     */
    @Nullable
    private InSlaveChannel register(SocketChannel channel) {
        InSlaveChannel inSlaveChannel = new InSlaveChannel(this, channel, controlConfig, dataConfig);
        boolean isAdded = inSlaveChannels.add(inSlaveChannel);
        if (!isAdded) {
            log.warn("[{}] SlaveChannel IP连接 已存在:{}，连接即将断开", this, inSlaveChannel);
            inSlaveChannel.channel().close();
            return null;
        }
        log.info("[{}] 注册了一个新的SlaveChannel:{}", this, inSlaveChannel);
        return inSlaveChannel;
    }

    /**
     * 移除新的子站通道
     * @param inSlaveChannel
     * @return
     */
    private void unregister(InSlaveChannel inSlaveChannel) {
        boolean isRemoved = inSlaveChannels.remove(inSlaveChannel);
        if (!isRemoved) {
            log.warn("[{}] 移除SlaveChannel失败:{}", this, inSlaveChannel);
            return;
        }
        log.info("[{}] 移除了一个SlaveChannel:{}", this, inSlaveChannel);
    }

    /**
     * 子站启动，打开端口，幂等
     * @return
     */
    public SlaveFuture<Void> open() {
        // 已经打开，则直接返回成功
        if (isOpen()) {
            return newPromise(Void.class).setSuccess();
        }

        // 正在打开，则返回 future
        if (openFuture != null) {
            return openFuture;
        }

        // 未打开，进行打开操作
        synchronized (this) {
            // 双重检查：已经打开，则直接返回成功
            if (isOpen()) {
                return newPromise(Void.class).setSuccess();
            }

            // 双重检查：正在打开，则返回 future
            if (openFuture != null) {
                return openFuture;
            }

            SlavePromise<Void> openPromise = newPromise(Void.class);
            openFuture = openPromise;
            ChannelFuture bindFuture = nettyServer.bind(localHost(), localPort(), initializer, channelInitializer);
            bindFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        serverChannel = (ServerChannel) future.channel();
                        openPromise.setSuccess();
                    } else {
                        openPromise.setFailure(future.cause());
                    }
                }
            });
            return openPromise.addListener(new SlaveFutureListener<Void>() {
                @Override
                public void operationComplete(SlaveFuture<Void> future) {
                    synchronized (this) {
                        openFuture = null;
                    }
                }
            });
        }
    }

    @Override
    public synchronized boolean isOpen() {
        return serverChannel != null && serverChannel.isOpen();
    }

    TcTokenPool getTcTokenPool() {
        return tcTokenPool;
    }

    TaTokenPool getTaTokenPool() {
        return taTokenPool;
    }

    public SlaveFuture<Void> close() {
        // 已经关闭，则直接返回成功
        if (!isOpen()) {
            return newPromise(Void.class).setSuccess();
        }

        // 正在关闭，则返回 future
        if (closeFuture != null) {
            return closeFuture;
        }

        // 未关闭，进行关闭操作
        synchronized (this) {
            // 双重检查：已经关闭，则直接返回成功
            if (!isOpen()) {
                return newPromise(Void.class).setSuccess();
            }

            // 双重检查：正在关闭，则返回 future
            if (closeFuture != null) {
                return closeFuture;
            }

            SlavePromise<Void> closePromise = newPromise(Void.class);
            closeFuture = closePromise;
            serverChannel.close().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        closePromise.setFailure(future.cause());
                        return;
                    }

                    // 关闭所有的 socketChannel
                    List<ChannelFuture> childFutures = inSlaveChannels.stream()
                            .map(inSlaveChannel -> inSlaveChannel.channel().close()).collect(Collectors.toList());
                    NSuccessPolicy.ALL_IS_SUCCESS.apply3(closePromise, childFutures); // 要求所有的通道都要关闭
                }
            });

            return closePromise.addListener(new SlaveFutureListener<Void>() {
                @Override
                public void operationComplete(SlaveFuture<Void> future) {
                    synchronized (this) {
                        closeFuture = null;
                    }
                }
            });
        }
    }

    @Override
    public Executor executor() {
        return nettyServer.scheduledExecutorService();
    }

    @Override
    public String localHost() {
        return localHost;
    }

    @Override
    public int localPort() {
        return localPort;
    }

    @Override
    public int rtuAddr() {
        return rtuAddr;
    }

    @Override
    public void forEachSlaveChannel(Consumer<SlaveChannel> consumer) {
        inSlaveChannels.forEach(consumer);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        InSlave that = (InSlave) o;
        return localPort == that.localPort && Objects.equals(localHost, that.localHost);
    }

    @Override
    public int hashCode() {
        return Objects.hash(localHost, localPort);
    }

    @Override
    public String toString() {
        return "InSlave:<" + localHost + ":" + localPort + ">";
    }
}
